Solutions/Vectra XDR/Data Connectors/VectraDataConnector/SharedCode/collector.py (1,099 lines of code) (raw):
"""This file contains methods for validations, checkpoint, pulling and pushing data."""
import sys
import datetime
import json
import inspect
import requests
import hashlib
from datetime import timedelta
from requests.auth import HTTPBasicAuth
from ..SharedCode import consts, keyvault_secrets_management
from ..SharedCode.azure_sentinel import AzureSentinel
class BaseCollector:
"""This class contains methods to create object and helper methods."""
def __init__(self, applogger, function_name, client_id, client_secret) -> None:
"""Initialize instance variable for class."""
self.connection_string = consts.CONNECTION_STRING
self.base_url = consts.BASE_URL
self.client_id = client_id
self.client_secret = client_secret
self.start_time = consts.START_TIME
self.applogger = applogger
self.azuresentinel = AzureSentinel()
self.session = requests.Session()
self.session.headers["User-Agent"] = consts.USER_AGENT
self.session.headers["Content-Type"] = "application/x-www-form-urlencoded"
self.function_name = function_name
self.access_token_expiration = None
self.refresh_token_expiration = None
self.access_token = None
self.refresh_token = None
self.keyvault_obj = keyvault_secrets_management.KeyVaultSecretManage(applogger)
def load_tokens_from_keyvault(self):
"""Load tokens from KeyVault based on access and refresh token keys."""
properties_list = self.keyvault_obj.get_properties_list_of_secrets()
if self.access_token_key in properties_list:
self.access_token = self.keyvault_obj.get_keyvault_secret(self.access_token_key)
if self.refresh_token_key in properties_list:
self.refresh_token = self.keyvault_obj.get_keyvault_secret(self.refresh_token_key)
def load_expiration_time_from_checkpoint_file(self):
"""Load expiration time from checkpoint file."""
__method_name = inspect.currentframe().f_code.co_name
try:
data = self.state.get()
if data is not None and data:
data = json.loads(data)
self.access_token_expiration = data.get(self.access_token_expiry, None)
self.refresh_token_expiration = data.get(self.refresh_token_expiry, None)
except Exception as err:
self.applogger.error(
"{}(method={}) : {} : Exception occured while getting data from storage account. : {}".format(
consts.LOGS_STARTS_WITH, __method_name, self.function_name, err
)
)
raise Exception(f"Exception occured during getting data from azure : {err}.")
def add_token_expiration_to_checkpoint_file(self, token_data, from_refresh_token: bool):
"""Add token to keyvault and expiration time to checkpoint file.
Args:
token_data (json): token_data to be added in checkpoint file and keyvault.
"""
try:
__method_name = inspect.currentframe().f_code.co_name
checkpoint_data = {}
self.access_token = token_data["access_token"]
self.session.headers["Authorization"] = "bearer {}".format(
self.access_token
)
self.access_token_expiration = (
datetime.datetime.now() + timedelta(seconds=token_data["expires_in"])
).isoformat()
if not from_refresh_token:
self.applogger.info(
"{}(method={}) : {} : Both access token and refresh token are generated successfully.".format(
consts.LOGS_STARTS_WITH, __method_name, self.function_name
)
)
self.refresh_token = token_data["refresh_token"]
self.keyvault_obj.set_keyvault_secret(self.refresh_token_key, self.refresh_token)
self.refresh_token_expiration = (
datetime.datetime.now() + timedelta(seconds=token_data["refresh_expires_in"])
).isoformat()
token_data = {
self.access_token_expiry: self.access_token_expiration,
self.refresh_token_expiry: self.refresh_token_expiration,
}
else:
self.applogger.info(
"{}(method={}) : {} : Access Token Generated from Refresh Token.".format(
consts.LOGS_STARTS_WITH, __method_name, self.function_name
)
)
token_data = {self.access_token_expiry: self.access_token_expiration}
checkpoint_data.update(token_data)
self.keyvault_obj.set_keyvault_secret(self.access_token_key, self.access_token)
self.save_checkpoint(data=checkpoint_data, token_expiry_time=True)
except Exception as err:
self.applogger.error(
"{}(method={}) : {} : Error occured in storing token or expiration time: {} ".format(
consts.LOGS_STARTS_WITH, __method_name, self.function_name, err
)
)
raise Exception(
"Error occured in storing token to keyvault or expiration time to checkpoint file : {}".format(err)
)
def generate_access_token(self, from_refresh_token: bool):
"""Generate the Token and Refresh token.
Args:
from_refresh_token (bool): To generate Access Token from refresh token or not
Raises:
ValueError: When response status code is not [200,201]
"""
try:
__method_name = inspect.currentframe().f_code.co_name
url = self.base_url + consts.OAUTH2_ENDPOINT
auth = HTTPBasicAuth(self.client_id, self.client_secret)
data = None
if from_refresh_token:
data = {
"grant_type": "refresh_token",
"refresh_token": self.refresh_token,
}
self.applogger.debug(
"{}(method={}) : {} : Generating access token using refresh token.".format(
consts.LOGS_STARTS_WITH, __method_name, self.function_name
)
)
else:
data = {"grant_type": "client_credentials"}
self.applogger.debug(
"{}(method={}) : {} : Generating access token.".format(
consts.LOGS_STARTS_WITH, __method_name, self.function_name
)
)
response = requests.post(url, auth=auth, data=data, timeout=consts.API_TIMEOUT)
if response.status_code in [200, 201]:
token_data = response.json()
self.add_token_expiration_to_checkpoint_file(token_data, from_refresh_token)
self.update_checkpoint_of_disabling_function(make_count_zero=True)
self.applogger.debug(
"{}(method={}) : {} : New access token generated successfully ".format(
consts.LOGS_STARTS_WITH, __method_name, self.function_name
)
)
elif response.status_code in [400, 401, 403, 404]:
self.update_checkpoint_of_disabling_function()
self.applogger.error(
"{}(method={}) : {} : status code={}, reason={}".format(
consts.LOGS_STARTS_WITH,
__method_name,
self.function_name,
response.status_code,
response.text
)
)
raise Exception
else:
self.applogger.error(
"{}(method={}) : {} : Unknown status code: {} ".format(
consts.LOGS_STARTS_WITH,
__method_name,
self.function_name,
response.status_code,
)
)
raise Exception
except requests.exceptions.RequestException as err:
self.applogger.error(
"{}(method={}) : {} : Error occurred while generating a Access Token : {} ".format(
consts.LOGS_STARTS_WITH, __method_name, self.function_name, err
)
)
raise Exception(f"Error occurred while generating a Access Token : {err}")
except Exception as err:
self.applogger.error(
"{}(method={}) : {} : Error occurred while generating a Access Token : {} ".format(
consts.LOGS_STARTS_WITH, __method_name, self.function_name, err
)
)
raise Exception(err)
def validate_access_token(self):
"""Validate the existing token and refresh token."""
try:
__method_name = inspect.currentframe().f_code.co_name
from_refresh_token = 0
if self.access_token:
if self.is_token_expired(self.access_token_expiration) and not self.is_token_expired(
self.refresh_token_expiration
):
self.applogger.info(
"{}(method={}) : {} : Access token expired. Generating new access token using refresh token.".format(
consts.LOGS_STARTS_WITH, __method_name, self.function_name
)
)
from_refresh_token = 1
self.generate_access_token(from_refresh_token)
elif self.is_token_expired(self.access_token_expiration) and self.is_token_expired(
self.refresh_token_expiration
):
self.applogger.info(
"{}(method={}) : {} : Both access token and refresh token are expired or expiration time does not exist.".format(
consts.LOGS_STARTS_WITH, __method_name, self.function_name
)
)
self.generate_access_token(from_refresh_token)
else:
self.applogger.info(
"{}(method={}) : {} : Using existing and valid access token.".format(
consts.LOGS_STARTS_WITH, __method_name, self.function_name
)
)
else:
self.applogger.info(
"{}(method={}) : {} : Access token does not exist.Generating new access token and refresh token.".format(
consts.LOGS_STARTS_WITH, __method_name, self.function_name
)
)
self.generate_access_token(from_refresh_token)
except Exception as err:
self.applogger.error(
"{}(method={}) : {} : Error in getting access token. : {}".format(
consts.LOGS_STARTS_WITH, __method_name, self.function_name, err
)
)
raise Exception(err)
def is_token_expired(self, token_expiration):
"""To check if token is expired or not.
Returns:
bool: True or False
"""
if not token_expiration:
return True
current_time = datetime.datetime.now().isoformat()
return bool(token_expiration and (current_time > (token_expiration)))
def update_checkpoint_of_disabling_function(self, make_count_zero=False):
"""Update checkpoint file for disabling function.
Args:
make_count_zero (bool): True when count of status code should be made zero
Raises:
Exception: if anything goes unintended
"""
try:
__method_name = inspect.currentframe().f_code.co_name
checkpoint_file = self.state.get()
checkpoint_json_data = {"disable_function": False}
if checkpoint_file:
checkpoint_json_data.update(json.loads(checkpoint_file))
if checkpoint_json_data.get("disable_function"):
checkpoint_json_data.update({"disable_function": False, "status_code_count": 0})
self.state.post(json.dumps(checkpoint_json_data))
if make_count_zero:
if checkpoint_json_data.get("status_code_count") != 0:
checkpoint_json_data.update({"status_code_count": 0})
self.state.post(json.dumps(checkpoint_json_data))
self.applogger.debug(
"{}(method={}): {}: Checkpoint file updated as total count of status code made zero.".format(
consts.LOGS_STARTS_WITH, __method_name, self.function_name
)
)
else:
self.applogger.debug(
"{}(method={}): {}: Checkpoint file already have total count of status code equal to zero.".format(
consts.LOGS_STARTS_WITH, __method_name, self.function_name
)
)
else:
current_count = checkpoint_json_data.get("status_code_count", 0)
updated_status_code_count = current_count + 1
if updated_status_code_count >= 10:
self.applogger.info(
"{}(method={}): {}: Function App will be Disabled.".format(
consts.LOGS_STARTS_WITH, __method_name, self.function_name
)
)
checkpoint_json_data.update({"disable_function": True, "status_code_count": updated_status_code_count})
self.state.post(json.dumps(checkpoint_json_data))
self.disable_function()
return
checkpoint_json_data.update({"status_code_count": updated_status_code_count})
self.state.post(json.dumps(checkpoint_json_data))
self.applogger.debug(
"{}(method={}): {}: Updated failed status code count in checkpoint file.".format(
consts.LOGS_STARTS_WITH, __method_name, self.function_name
)
)
except Exception as err:
self.applogger.error(
"{}(method={}): {}: Error Occured while updating checkpoint file of disabling function. : '{}'".format(
consts.LOGS_STARTS_WITH, __method_name, self.function_name, err
)
)
raise Exception(f"Error Occured while updating checkpoint file of disabling function: {err}")
def disable_function(self):
"""Disable a function app as failure count reached to 10.
Raises:
Exception: if any error occurs
"""
try:
__method_name = inspect.currentframe().f_code.co_name
url = (consts.DISABLE_FUNCTION_APP_URL).format(consts.SUBSCRIPTION_ID, consts.RESOURCE_GROUP, consts.FUNCTION_APP_NAME)
access_token = self.azure_token_generator()
headers = {"Authorization": f"Bearer {access_token}"}
response = requests.post(url=url, headers=headers)
response.raise_for_status()
if response.status_code in [200, 201]:
self.applogger.info(
"{}(method={}): {}: Function App disabled successfully.".format(
consts.LOGS_STARTS_WITH, __method_name, self.function_name
)
)
else:
self.applogger.error(
"{}(method={}): {}: Error occurred while disabling function. status_code={}, reason={}".format(
consts.LOGS_STARTS_WITH, __method_name, self.function_name, response.status_code, response.text
)
)
raise Exception
except requests.exceptions.HTTPError as err:
self.applogger.error(
"{}(method={}): {}: Error occurred while disabling function : status code: {}, reason: '{}'.".format(
consts.LOGS_STARTS_WITH, __method_name, self.function_name, response.status_code, err
)
)
self.save_checkpoint(data={"disable_function": False}, disable_function_error=True)
raise Exception
except Exception as err:
self.applogger.error(
"{}(method={}): {}: Error occured while disabling function : '{}'.".format(
consts.LOGS_STARTS_WITH, __method_name, self.function_name, err
)
)
self.save_checkpoint(data={"disable_function": False}, disable_function_error=True)
raise Exception
def azure_token_generator(self):
"""Generate access token for azure."""
try:
__method_name = inspect.currentframe().f_code.co_name
url = consts.AZURE_AUTHENTICATION_URL.format(consts.AZURE_TENANT_ID)
data = {
"client_id": consts.AZURE_CLIENT_ID,
"client_secret": consts.AZURE_CLIENT_SECRET,
"grant_type": "client_credentials",
"scope": consts.AZURE_AUTHENTICATION_SCOPE,
}
response = requests.get(url=url, data=data)
if response.status_code in [200, 201]:
self.applogger.info(
"{}(method={}): {}: Azure Token Generated Successfully.".format(
consts.LOGS_STARTS_WITH, __method_name, self.function_name
)
)
return response.json()["access_token"]
else:
self.applogger.error(
"{}(method={}): {}: Unknown Status code while generating access token for AZURE: {} .".format(
consts.LOGS_STARTS_WITH, __method_name, self.function_name, response.status_code
)
)
self.save_checkpoint(data={"disable_function": False}, disable_function_error=True)
raise Exception
except Exception as err:
self.applogger.info(
"{}(method={}): {}: Error occured while generating access token for AZURE: {}.".format(
consts.LOGS_STARTS_WITH, __method_name, self.function_name, err
)
)
self.save_checkpoint(data={"disable_function": False}, disable_function_error=True)
raise Exception
def validate_params(self, client_id_name, client_secret_name, snapshot=False):
"""To validate parameters of function app."""
__method_name = inspect.currentframe().f_code.co_name
required_params = {
"BaseURL": self.base_url,
client_id_name: self.client_id,
client_secret_name: self.client_secret,
"WorkspaceID": consts.WORKSPACE_ID,
"WorkspaceKey": consts.WORKSPACE_KEY,
"Detections_Table_Name": consts.DETECTIONS_TABLE_NAME,
"Audits_Table_Name": consts.AUDITS_TABLE_NAME,
"Entity_Scoring_Table_Name": consts.ENTITY_SCORING_TABLE_NAME,
"Lockdown_Table_Name": consts.LOCKDOWN_TABLE_NAME,
"Health_Table_Name": consts.HEALTH_TABLE_NAME,
"AZURE_CLIENT_ID": consts.AZURE_CLIENT_ID,
"AZURE_CLIENT_SECRET": consts.AZURE_CLIENT_SECRET,
"AZURE_TENANT_ID": consts.AZURE_TENANT_ID,
"KeyVaultName": consts.KEYVAULT_NAME,
"Entities_Table_Name": consts.ENTITIES_TABLE_NAME,
"Azure_Subscription_Id": consts.SUBSCRIPTION_ID,
"Azure_Resource_Group_Name": consts.RESOURCE_GROUP,
}
missing_required_field = False
for label, params in required_params.items():
if not params:
missing_required_field = True
self.applogger.error(
'{}(method={}): {}: "{}" field is not configured. field_value="{}"'.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.function_name,
label,
params,
)
)
if missing_required_field:
raise Exception("Error Occurred while validating params. Required fields missing.")
if not self.base_url.startswith("https://"):
self.applogger.error(
'{}(method={}) : {} : "BaseURL" must start with ”https://” schema followed '
'by hostname. BaseURL="{}"'.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.function_name,
self.base_url,
)
)
raise Exception("Error Occurred while validating params. Invalid format for BaseURL.")
if not snapshot:
try:
if self.start_time:
input_date = datetime.datetime.strptime(self.start_time, r"%m/%d/%Y %H:%M:%S")
now = datetime.datetime.utcnow()
if input_date > now:
self.applogger.error(
'{}(method={}) : {} : "StartTime" should not be in future. StartTime="{}"'.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.function_name,
self.start_time,
)
)
raise Exception("Error Occurred while validating params. StartTime cannot be in the future.")
self.start_time = datetime.datetime.strftime(input_date, r"%Y-%m-%dT%H:%M:%SZ")
else:
self.start_time = (
datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc) - datetime.timedelta(hours=24)
).strftime(r"%Y-%m-%dT%H:%M:%SZ")
except ValueError:
self.applogger.error(
'{}(method={}) : {} : "StartTime" should be in "MM/DD/YYYY HH:MM:SS" (UTC) '
'format. StartTime="{}"'.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.function_name,
self.start_time,
)
)
raise Exception("Error Occurred while validating params. Invalid StartTime format.")
def validate_connection(self):
"""To validate the connection with vectra and generate access token."""
__method_name = inspect.currentframe().f_code.co_name
try:
self.load_tokens_from_keyvault()
self.load_expiration_time_from_checkpoint_file()
self.validate_access_token()
if self.access_token:
self.session.headers["Authorization"] = "bearer {}".format(self.access_token)
self.applogger.info(
"{}(method={}) : {} : Token or Connection validation is successful.".format(
consts.LOGS_STARTS_WITH, __method_name, self.function_name
)
)
return
except Exception as ex:
raise Exception(ex)
def pull_data(self, endpoint, params=None):
"""To call vectra API till retry condition is not met.
Args:
endpoint (str): endpoint to call in Vectra.
params (dict, optional): params to pass to API. Defaults to None.
Returns:
response: response object
"""
retry_flag = True
retry_count = 0
while retry_flag and retry_count <= 1:
retry_flag, res = self.pull(url=self.base_url + endpoint, params=params)
retry_count += 1
if retry_count > 1 and not res:
self.update_checkpoint_of_disabling_function()
raise Exception("Retry limit exceeded. Hence exiting the function.")
return res
def pull(self, url, params=None, data=None, auth=None, method="GET"):
"""Will pull data from each API.
Args:
url (str): _description_
params (json, optional): _description_. Defaults to None.
data (json, optional): _description_. Defaults to None.
auth (json, optional): _description_. Defaults to None.
method (str, optional): _description_. Defaults to "GET".
Raises:
Exception: _description_
Exception: _description_
Exception: _description_
Returns:
_type_: _description_
"""
try:
__method_name = inspect.currentframe().f_code.co_name
self.applogger.debug(
"{}(method={}) : {} request: url='{}' version='{}'".format(
consts.LOGS_STARTS_WITH,
__method_name,
self.function_name,
url,
self.session.headers["User-Agent"],
)
)
if method == "POST":
res = self.session.post(
url=url,
auth=auth,
params=params,
data=data,
timeout=consts.API_TIMEOUT,
)
else:
res = self.session.get(
url=url,
auth=auth,
params=params,
data=data,
timeout=consts.API_TIMEOUT,
)
if res.status_code == 401:
self.applogger.warning(
"{}(method={}) : {} : Access token is invalid. generating new token.".format(
consts.LOGS_STARTS_WITH, __method_name, self.function_name
)
)
if not self.is_token_expired(self.access_token_expiration) and not self.is_token_expired(
self.refresh_token_expiration
):
self.generate_access_token(from_refresh_token=False)
else:
self.validate_access_token()
return True, None
elif res and res.status_code in [200, 201]:
self.applogger.debug(
'{}(method={}) : {} : API call: Response received successfully. url="{}" params="{}"'.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.function_name,
url,
params,
)
)
self.update_checkpoint_of_disabling_function(make_count_zero=True)
return False, res.json()
elif res.status_code in [400, 403, 404]:
self.applogger.error(
"{}(method={}) : {} : API call: response: url={} status_code={} response={}".format(
consts.LOGS_STARTS_WITH,
__method_name,
self.function_name,
url,
res.status_code,
res.text,
)
)
self.update_checkpoint_of_disabling_function()
raise Exception
else:
self.applogger.error(
"{}(method={}) : {} : API call: Unknown status code or empty "
'response: url="{}" status_code="{}" response="{}"'.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.function_name,
url,
res.status_code,
res.text,
)
)
raise Exception("Received unknown status code or empty response.")
except requests.exceptions.RequestException as ex:
if res.status_code == 404:
self.applogger.debug(
'{}(method={}) : {} : API call: Got {} Status Code : url="{}"'
' response="{}"'.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.function_name,
url,
ex.response.status_code,
ex.response.text,
)
)
return {}
else:
self.applogger.error(
'{}(method={}) : {} : API call: Unsuccessful response: url="{}" status_code="{}"'
' response="{}"'.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.function_name,
url,
ex.response.status_code,
ex.response.text,
)
)
raise Exception("HTTP Error Occurred while getting response from api.")
except Exception as ex:
self.applogger.error(
'{}(method={}) : {} : API call: Unexpected error while API call url="{}" error="{}"'.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.function_name,
url,
str(ex),
)
)
raise Exception("Error Occurred while getting response from api.")
def get_checkpoint_field_and_value(self):
"""Fetch last data from checkpoint file.
Returns:
None/json: last_data
"""
try:
__method_name = inspect.currentframe().f_code.co_name
field = None
checkpoint = self.state.get()
if checkpoint:
checkpoint = json.loads(checkpoint)
if checkpoint.get("from", None):
field, checkpoint = "from", checkpoint.get("from")
elif checkpoint.get("event_timestamp_gte"):
field, checkpoint = "event_timestamp_gte", checkpoint.get("event_timestamp_gte")
else:
field, checkpoint = "event_timestamp_gte", self.start_time
self.state.post(json.dumps({"event_timestamp_gte": self.start_time}))
else:
field, checkpoint = "event_timestamp_gte", self.start_time
self.state.post(json.dumps({"event_timestamp_gte": self.start_time}))
self.applogger.info(
'{}(method={}) : {} : Checkpoint field="{}" and value="{}"'.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.function_name,
field,
checkpoint,
)
)
return field, checkpoint
except Exception as ex:
self.applogger.error(
'{}(method={}) : {} : Unexpected error while getting checkpoint: err="{}"'.format(
consts.LOGS_STARTS_WITH, __method_name, self.function_name, str(ex)
)
)
raise Exception(ex)
def save_checkpoint(
self, data, token_expiry_time=False, entity_state=None, entities_endpoint=False, disable_function_error=False
):
"""Post checkpoint into sentinel."""
try:
__method_name = inspect.currentframe().f_code.co_name
checkpoint_data = self.state.get()
checkpoint_data_json = {}
if checkpoint_data:
checkpoint_data_json = json.loads(checkpoint_data)
if token_expiry_time:
checkpoint_data_json.update(data)
self.state.post(json.dumps(checkpoint_data_json))
self.applogger.info(
'{}(method={}) : {} : successfully saved updated expiration time in checkpoint."{}"'.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.function_name,
data,
)
)
elif entities_endpoint:
checkpoint_data_json.update({"last_modified_timestamp": data})
entity_state.post(json.dumps(checkpoint_data_json))
self.applogger.info(
'{}(method={}) : {} : successfully saved checkpoint. last_modified_timestamp="{}"'.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.function_name,
data,
)
)
elif disable_function_error:
checkpoint_data_json.update(data)
self.state.post(json.dumps(checkpoint_data_json))
self.applogger.debug(
'{}(method={}) : {} : error while disabling function. So making disable_function flag to False.'.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.function_name
)
)
else:
checkpoint_data_json.update({"from": data})
self.state.post(json.dumps(checkpoint_data_json))
self.applogger.info(
'{}(method={}) : {} : successfully saved checkpoint. from="{}"'.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.function_name,
data,
)
)
except Exception as ex:
self.applogger.exception(
'{}(method={}) : {} : Unexpected error while saving checkpoint: err="{}"'.format(
consts.LOGS_STARTS_WITH, __method_name, self.function_name, str(ex)
)
)
raise Exception(ex)
def post_data_to_sentinel(self, data, table_name, fields):
"""To post data into sentinel."""
__method_name = inspect.currentframe().f_code.co_name
if fields:
for event in data:
for field in fields:
event[field] = [event.get(field)]
data = json.dumps(data)
status_code = self.azuresentinel.post_data(data, table_name)
if status_code in consts.SENTINEL_ACCEPTABLE_CODES:
self.applogger.info(
'{}(method={}) : {} : Successfully posted the data in the table="{}"'.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.function_name,
table_name,
)
)
else:
self.applogger.error(
'{}(method={}) : {} : Data is not posted in the table="{}" status_code="{}"'.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.function_name,
table_name,
status_code,
)
)
raise Exception("Error Occurred while posting data into Microsoft Sentinel Log Analytics Workspace.")
def _get_size_of_chunk_in_mb(self, chunk):
"""Get the size of chunk in MB."""
return sys.getsizeof(chunk) / (1024 * 1024)
def _create_chunks_and_post_to_sentinel(
self, data, table_name, fields, entity_state=None, entities_endpoint=False
):
"""Create chunks and post to chunk to sentinel."""
__method_name = inspect.currentframe().f_code.co_name
chunk = []
if self._get_size_of_chunk_in_mb(data) < 30:
self.post_data_to_sentinel(data, table_name, fields)
return
for event in data:
chunk.append(event)
if self._get_size_of_chunk_in_mb(chunk) >= 30:
if chunk[:-1]:
self.post_data_to_sentinel(chunk[:-1], table_name, fields)
if entities_endpoint:
next_checkpoint = chunk[-2].get("last_modified_timestamp")
else:
next_checkpoint = chunk[-2].get("id")
self.save_checkpoint(
data=next_checkpoint, entity_state=entity_state, entities_endpoint=entities_endpoint
)
chunk = [event]
continue
else:
id = chunk[0].get("id")
self.applogger.error(
"{}(method={}) : {} : event with id {} is too large to post into the sentinel hence skipping it.".format(
consts.LOGS_STARTS_WITH,
__method_name,
self.function_name,
id,
)
)
chunk = []
continue
if chunk:
self.post_data_to_sentinel(chunk, table_name, fields)
def pull_and_push_the_data(
self,
endpoint,
checkpoint_field,
checkpoint_value,
table_name,
fields=None,
query_params=dict(),
):
"""To pull the data from vectra and push into sentinel."""
__method_name = inspect.currentframe().f_code.co_name
posted_event_count = 0
iter_next = True
query_params.update({"limit": consts.PAGE_SIZE, checkpoint_field: checkpoint_value})
while iter_next:
res = self.pull_data(endpoint, params=query_params)
next_checkpoint = res.get("next_checkpoint", None)
if endpoint == consts.DETECTIONS_ENDPOINT and len(res.get("events")):
self.applogger.debug(
"{}(method={}) : {} : Trying to collect the additional information from"
" /detections endpoint for type=host and type=account.".format(
consts.LOGS_STARTS_WITH, __method_name, self.function_name
)
)
detection_ids_list = []
for event in res.get("events"):
if event.get("type") == "host" or event.get("type") == "account":
detection_ids_list.append(str(event.get("detection_id")))
detection_id_set = set(detection_ids_list)
detections_ids = ",".join(detection_id_set)
next = True
page = 1
merge_json = {}
if detections_ids:
while next:
detection_endpoint = "/api/v3.3/detections"
self.applogger.debug(
"{}(method={}) : {} : GET call to /detections endpoint with URL = {}".format(
consts.LOGS_STARTS_WITH,
__method_name,
self.function_name,
self.base_url + detection_endpoint,
)
)
host_details = self.pull_data(detection_endpoint, params={"id": detections_ids, "page": page})
if host_details.get("results"):
for each in host_details.get("results"):
merge_json[each.get("id")] = each
if not host_details.get("next"):
break
page += 1
for event in res.get("events"):
if event.get("type") == "host" and merge_json.get(event.get("detection_id"), {}):
temp_host = merge_json.get(event.get("detection_id"), {})
event["d_detection_details"] = [temp_host]
event["is_targeting_key_asset"] = str(temp_host.get("is_targeting_key_asset", ""))
event["src_host"] = [temp_host.get("src_host", {})]
event["normal_domains"] = temp_host.get("normal_domains", [])
event["src_ip"] = temp_host.get("src_ip", "")
event["summary"] = [temp_host.get("summary", {})]
event["grouped_details"] = temp_host.get("grouped_details", [])
event["tags"] = temp_host.get("tags", [])
elif event.get("type") == "account":
if event.get("detail", {}):
event["d_detection_details"] = [event.get("detail", {})]
if merge_json.get(event.get("detection_id"), {}):
account_data = merge_json.get(event.get("detection_id"), {})
event["tags"] = account_data.get("tags", [])
else:
event["d_detection_details"] = []
self.applogger.debug(
"{}(method={}) : {} : Successfully modified events/detections"
" response for id={}.".format(
consts.LOGS_STARTS_WITH,
__method_name,
self.function_name,
event.get("id"),
)
)
if res and len(res.get("events")):
self._create_chunks_and_post_to_sentinel(res.get("events"), table_name, fields)
posted_event_count += len(res.get("events"))
iter_next = True if int(res.get("remaining_count")) > 0 else False
query_params.update({"limit": consts.PAGE_SIZE, "from": next_checkpoint})
else:
iter_next = False
if endpoint == consts.ENTITY_SCORING_ENDPOINT and (next_checkpoint is None or next_checkpoint == "null"):
break
else:
self.save_checkpoint(data=next_checkpoint)
self.applogger.info(
"{}(method={}) : {} : Posted total {} event(s) into MS Sentinel. No more events."
" Stopping the collection.".format(
consts.LOGS_STARTS_WITH,
__method_name,
self.function_name,
posted_event_count,
)
)
def pull_and_push_the_snapshot_data(
self,
endpoint,
table_name,
hashed_events_list=list(),
hash_field_list=[],
fields=None,
):
"""To pull the snapshot type data from vectra and push into sentinel."""
__method_name = inspect.currentframe().f_code.co_name
posted_event_count = 0
res = self.pull_data(endpoint)
if res and len(res):
if endpoint == consts.HEALTH_ENDPOINT:
link_status_dict, aggregated_peak_traffic_dict = {}, {}
connectivity_dict, trafficdrop_dict = {}, {}
# for link_status
for k, v in res.get("network", {}).get("interfaces", {}).get("sensors", {}).items():
link_status = "UP"
for x, y in v.items():
if y.get("link", "") != "UP":
link_status = "Degraded"
break
link_status_dict[k] = link_status
# for aggregated_peak_traffic
for key, value in res.get("network", {}).get("traffic", {}).get("sensors", {}).items():
aggregated_peak_traffic_dict[key] = value.get("aggregated_peak_traffic_mbps", "")
# for connectivity status and error
for item in res.get("connectivity", {}).get("sensors", {}):
connectivity_dict[item.get("name", "")] = {
"status": item.get("status", ""),
"error": item.get("error", ""),
}
# for traffic_drop status and error
for item in res.get("trafficdrop", {}).get("sensors", {}):
trafficdrop_dict[item.get("name", "")] = {
"status": item.get("status", ""),
"error": item.get("error", ""),
}
for i in range(len(res.get("sensors", {}))):
# adding d_link_status
res["sensors"][i]["d_link_status"] = link_status_dict.get(
res.get("sensors", {})[i].get("luid", ""), ""
)
# adding d_aggregated_peak_traffic
res["sensors"][i]["d_aggregated_peak_traffic"] = aggregated_peak_traffic_dict.get(
res.get("sensors", {})[i].get("name", ""), ""
)
# adding d_connectivity_status
res["sensors"][i]["d_connectivity_status"] = connectivity_dict.get(
res.get("sensors", {})[i].get("name", ""), {}
).get("status", "")
# adding d_connectivity_error
res["sensors"][i]["d_connectivity_error"] = connectivity_dict.get(
res.get("sensors", {})[i].get("name", ""), {}
).get("error", "")
# adding d_trafficdrop_status
res["sensors"][i]["d_trafficdrop_status"] = trafficdrop_dict.get(
res.get("sensors", {})[i].get("name", ""), {}
).get("status", "")
# adding d_trafficdrop_error
res["sensors"][i]["d_trafficdrop_error"] = trafficdrop_dict.get(
res.get("sensors", {})[i].get("name", ""), {}
).get("error", "")
list_res = [res]
self.post_data_to_sentinel(list_res, table_name, fields)
posted_event_count += 1
else:
for event in res:
temp_dict = {}
for field in hash_field_list:
temp_dict[field] = event.get(field)
hash_of_event = self.get_results_hash(temp_dict)
if hash_of_event not in hashed_events_list:
self.post_data_to_sentinel(event, table_name, fields)
posted_event_count += 1
hashed_events_list.append(hash_of_event)
self.save_checkpoint_snapshot(hashed_events_list)
self.applogger.info(
"{}(method={}) : {} : Posted total {} event(s) into MS Sentinel. No more events."
" Stopping the collection.".format(
consts.LOGS_STARTS_WITH,
__method_name,
self.function_name,
posted_event_count,
)
)
def get_results_hash(self, data):
"""Generate hash digest.
:data: Data to be hashed.
:return: SHA512 hexdigest of data.
"""
data = json.dumps(data, sort_keys=True)
result = hashlib.sha512(data.encode())
result_hash = result.hexdigest()
return result_hash
def get_checkpoint_snapshot(self):
"""Fetch snapshot hash from checkpoint file.
Returns:
List: hash_list
"""
try:
__method_name = inspect.currentframe().f_code.co_name
checkpoint = self.state.get()
if checkpoint:
checkpoint = json.loads(checkpoint)
checkpoint = checkpoint.get("snapshot")
self.applogger.info(
"{}(method={}) : {} : Checkpoint list fetched successfully. checkpoint={}".format(
consts.LOGS_STARTS_WITH,
__method_name,
self.function_name,
checkpoint,
)
)
else:
checkpoint = []
self.state.post(json.dumps({"snapshot": checkpoint}))
self.applogger.info(
"{}(method={}) : {} : Checkpoint list not found. Created new checkpoint list.".format(
consts.LOGS_STARTS_WITH, __method_name, self.function_name
)
)
return checkpoint
except Exception as ex:
self.applogger.error(
'{}(method={}) : {} : Unexpected error while getting checkpoint list: err="{}"'.format(
consts.LOGS_STARTS_WITH, __method_name, self.function_name, str(ex)
)
)
raise Exception(ex)
def save_checkpoint_snapshot(self, value):
"""Post checkpoint snapshot into sentinel."""
try:
__method_name = inspect.currentframe().f_code.co_name
checkpoint_data = self.state.get()
checkpoint_data_json = {}
if checkpoint_data:
checkpoint_data_json = json.loads(checkpoint_data)
checkpoint_data_json.update({"snapshot": value})
self.state.post(json.dumps(checkpoint_data_json))
self.applogger.info(
"{}(method={}) : {} : successfully saved checkpoint.".format(
consts.LOGS_STARTS_WITH, __method_name, self.function_name
)
)
except Exception as ex:
self.applogger.exception(
'{}(method={}) : {} : Unexpected error while saving checkpoint: err="{}"'.format(
consts.LOGS_STARTS_WITH, __method_name, self.function_name, str(ex)
)
)
raise Exception(ex)
def get_entities_checkpoint(self, entity_state):
"""Fetch last data from checkpoint file for entities.
Args:
entity_state (object): State manager object
Returns:
None/json: last_data
"""
try:
__method_name = inspect.currentframe().f_code.co_name
self.applogger.debug(
"{}(method={}) : {} : Fetching last data from checkpoint file.".format(
consts.LOGS_STARTS_WITH, __method_name, self.function_name
)
)
checkpoint = entity_state.get()
if checkpoint:
checkpoint = json.loads(checkpoint)
if checkpoint.get("last_modified_timestamp", None):
checkpoint = checkpoint.get("last_modified_timestamp")
self.applogger.info(
'{}(method={}) : {} : Checkpoint value="{}"'.format(
consts.LOGS_STARTS_WITH,
__method_name,
self.function_name,
checkpoint,
)
)
return checkpoint
return None
except Exception as ex:
self.applogger.error(
'{}(method={}) : {} : Unexpected error while getting checkpoint: err="{}"'.format(
consts.LOGS_STARTS_WITH, __method_name, self.function_name, str(ex)
)
)
raise Exception(ex)
def merge_assignment_details(self, entity_response, entity_type):
"""
To merge assignment details.
Args:
entity_response (list): List of entities
entity_type (str): Type of the entity (host/account)
Returns:
list: Updated List of entities
"""
__method_name = inspect.currentframe().f_code.co_name
try:
self.applogger.debug(
"{}(method={}) : {} : Trying to collect information from {} endpoint.".format(
consts.LOGS_STARTS_WITH,
__method_name,
self.function_name,
consts.ASSIGNMENT_ENDPOINT,
)
)
entity_ids_list = [str(event.get("id")) for event in entity_response]
entity_ids = ",".join(entity_ids_list)
entity_type_params = entity_type + "s"
merge_json = {}
if entity_ids:
assignment_params = {
"resolved": False,
"page_size": consts.PAGE_SIZE,
entity_type_params: entity_ids,
}
assignment_data = self.pull_data(
endpoint=consts.ASSIGNMENT_ENDPOINT, params=assignment_params
)
self.applogger.debug(
"{}(method={}) : {} : Collected information from {} endpoint.".format(
consts.LOGS_STARTS_WITH,
__method_name,
self.function_name,
consts.ASSIGNMENT_ENDPOINT,
)
)
if assignment_data.get("results"):
for assignment_result in assignment_data.get("results"):
merge_json[assignment_result.get(entity_type + "_id")] = assignment_result
for event in entity_response:
event['url'] = consts.BASE_URL + '/{}/{}?pivot={}'.format(entity_type_params, event.get("id"), consts.USER_AGENT)
if merge_json.get(event.get("id"), {}):
assignment_details = merge_json.get(event.get("id"), {})
entity_assignment = assignment_details.get("assigned_to", {})
event["assigned_to_username"] = entity_assignment.get(
"username", ""
)
return entity_response
except Exception as ex:
self.applogger.error(
'{}(method={}) : {} : Unexpected error while getting assignments data: err="{}"'.format(
consts.LOGS_STARTS_WITH, __method_name, self.function_name, str(ex)
)
)
raise Exception(ex)
def pull_and_push_entities_data(
self,
endpoint,
checkpoint_value,
table_name,
entity_type,
entity_state,
fields=None,
):
"""
To pull the entities data from vectra and push into sentinel.
Args:
endpoint (str): endpoint url
checkpoint_value (str): Checkpoint value from checkpoint file
table_name (str): Table name to ingest data into sentinel
entity_type (str): Type of the entity (host/account)
entity_state (object): State manager object
fields (list): list of fields
"""
__method_name = inspect.currentframe().f_code.co_name
try:
self.applogger.debug(
"{}(method={}) : {} Pull and Push entities data".format(
consts.LOGS_STARTS_WITH, __method_name, self.function_name
)
)
posted_entities_count = 0
page = 1
params = {
"page_size": consts.PAGE_SIZE,
"ordering": "last_modified_timestamp",
"type": entity_type,
}
if checkpoint_value:
params.update({"last_modified_timestamp_gte": checkpoint_value})
while True:
params.update({"page": page})
res = self.pull_data(endpoint, params=params)
entity_response = res.get("results")
self.applogger.debug(
"{}(method={}) : {} : Trying to collect information from"
" /entities endpoint.".format(
consts.LOGS_STARTS_WITH, __method_name, self.function_name
)
)
next_link = res.get("next", None)
if len(entity_response):
last_modified_timestamp = entity_response[-1].get(
"last_modified_timestamp"
)
merged_entity_response = self.merge_assignment_details(
entity_response, entity_type
)
self._create_chunks_and_post_to_sentinel(
merged_entity_response,
table_name,
fields,
entity_state,
entities_endpoint=True,
)
posted_entities_count += len(entity_response)
self.applogger.info(
"{}(method={}) : {} : Posted Entities Count : {}".format(
consts.LOGS_STARTS_WITH,
__method_name,
self.function_name,
posted_entities_count,
)
)
self.save_checkpoint(
data=last_modified_timestamp, entity_state=entity_state, entities_endpoint=True
)
if not next_link:
break
page += 1
except Exception as ex:
self.applogger.error(
'{}(method={}) : {} : Unexpected error while getting or posting entities data: err="{}"'.format(
consts.LOGS_STARTS_WITH, __method_name, self.function_name, str(ex)
)
)
raise Exception(ex)